# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import warnings
import itertools as it
from abc import ABCMeta, abstractmethod
from hysop import (
__KERNEL_DEBUG__,
__TRACE_KERNELS__,
__TRACE_NOCOPY__,
__TRACE_NOACCUMULATE__,
)
from hysop.tools.decorators import debug
from hysop.tools.htypes import check_instance, first_not_None
from hysop.tools.numpywrappers import npw
from hysop.backend.device.opencl import cl, __OPENCL_PROFILE__
from hysop.backend.device.opencl.opencl_kernel_statistics import OpenClKernelStatistics
from hysop.tools.warning import HysopWarning
from hysop.tools.profiler import FProfiler
[docs]
def should_trace_kernel(kernel_msg):
assert isinstance(kernel_msg, str)
kernel_msg = kernel_msg.strip()
if __TRACE_NOCOPY__ and kernel_msg.startswith("enqueue_copy"):
return False
elif __TRACE_NOACCUMULATE__ and kernel_msg.startswith("add<<<"):
return False
else:
return True
should_profile_kernel = should_trace_kernel
if __KERNEL_DEBUG__ or __TRACE_KERNELS__:
def trace_kernel(kernel_msg):
if should_trace_kernel(kernel_msg):
print(kernel_msg)
else:
[docs]
def trace_kernel(kernel_msg):
pass
if __OPENCL_PROFILE__:
def profile_kernel(kernel, evt, kernel_msg=None, fprofiler=None):
evt.wait()
if kernel is None:
assert kernel_msg is not None
else:
assert kernel_msg is None
show_profiling_info = getattr(kernel, "_show_profiling_info", True)
if show_profiling_info:
if not hasattr(kernel, "_apply_msg"):
msg = "Kernel of type {} has no '_apply_msg' attribute, this is required for profiling."
msg = kernel_msg.format(type(kernel).__name__)
raise AttributeError(kernel_msg)
kernel_msg = kernel._apply_msg
if (
__KERNEL_DEBUG__
and (kernel_msg is not None)
and should_profile_kernel(kernel_msg)
):
print(f"{evt.profile.end - evt.profile.start} | {kernel_msg.strip()}")
if not fprofiler is None:
fprofiler[kernel_msg] += (evt.profile.end - evt.profile.start) * 1e-9
else:
[docs]
def profile_kernel(kernel, evt, kernel_msg=None, fprofiler=None):
pass
[docs]
class OpenClKernelListLauncher:
"""
Wraps multiples OpenCL kernel ready to be enqueued without extra arguments.
All kernels contained in a KernelListLauncher are called with the same queue
(ie. OpenClKernelLauncher default_queue is not taken into account and all kernels
should at least have a pre-configured global_work_size).
"""
@debug
def __init__(self, name, profiler=None):
"""
Create a OpenClKernelListLauncher.
Parameters
----------
name: str
A name for this kernel list launcher (for logging purposes).
kwds: dict
Base class arguments.
"""
check_instance(name, str)
self._name = name
self._kernels = ()
self._parameters = {}
self._apply_msg = f">OpenClKernelListLauncher {name}"
self._profiler = profiler
[docs]
def push_copy_host_device(
self,
varname,
src,
dst,
src_device_offset=None,
dst_device_offset=None,
byte_count=None,
):
"""Shortcut for OpenClCopyBuffer kernels creation."""
from hysop.backend.device.opencl.opencl_copy_kernel_launchers import (
OpenClCopyBufferLauncher,
)
kernel = OpenClCopyBufferLauncher(
varname=varname,
src=src,
dst=dst,
byte_count=byte_count,
src_device_offset=src_device_offset,
dst_device_offset=dst_device_offset,
)
self.push_kernels(kernel)
return self
[docs]
def push_copy_host_to_device(self, varname, src, dst, dst_device_offset=None):
"""Shortcut for OpenClCopyHost2Device kernels creation."""
from hysop.backend.device.opencl.opencl_copy_kernel_launchers import (
OpenClCopyHost2DeviceLauncher,
)
kernel = OpenClCopyHost2DeviceLauncher(
varname=varname, src=src, dst=dst, dst_device_offset=dst_device_offset
)
self.push_kernels(kernel)
return self
[docs]
def push_copy_device_to_host(self, varname, src, dst, src_device_offset=None):
"""Shortcut for OpenClCopyDevice2Host kernels creation."""
from hysop.backend.device.opencl.opencl_copy_kernel_launchers import (
OpenClCopyDevice2HostLauncher,
)
kernel = OpenClCopyDevice2HostLauncher(
varname=varname, src=src, dst=dst, src_device_offset=src_device_offset
)
self.push_kernels(kernel)
return self
[docs]
def push_copy_device_to_device(
self,
varname,
src,
dst,
src_device_offset=None,
dst_device_offset=None,
byte_count=None,
):
"""Shortcut for OpenClCopyDevice2Device kernels creation."""
from hysop.backend.device.opencl.opencl_copy_kernel_launchers import (
OpenClCopyDevice2DeviceLauncher,
)
kernel = OpenClCopyDevice2DeviceLauncher(
varname=varname,
src=src,
dst=dst,
byte_count=byte_count,
src_device_offset=src_device_offset,
dst_device_offset=dst_device_offset,
)
self.push_kernels(kernel)
return self
[docs]
def push_kernels(self, *kernel_launchers):
"""
Push OpenClKernelLaunchers into the list.
None values are ignored for convenience.
"""
for launcher in kernel_launchers:
if launcher is None:
pass
elif isinstance(launcher, LauncherI):
launcher._profiler = self._profiler
if not launcher.global_size_configured():
msg = "OpenClKernelLauncher {} global_work_size has not been configured."
msg = msg.format(launcher.name)
raise RuntimeError(msg)
if isinstance(launcher, OpenClParametrizedKernelLauncher):
parameters = {k: v[1] for (k, v) in launcher.parameters_map.items()}
self._update_parameters_from_parametrized_kernel(
launcher, parameters
)
elif isinstance(launcher, HostLauncherI):
parameters = launcher.parameters()
self._update_parameters_from_parametrized_kernel(
launcher, parameters
)
self._kernels += (launcher,)
elif isinstance(launcher, OpenClKernelListLauncher):
self._update_parameters_from_kernel_list_launcher(launcher)
for kk in launcher._kernels:
kk._profiler = self._profiler
self._kernels += launcher._kernels
else:
msg = "Expected an OpenClKernelLauncher or a OpenClKernelListLauncher "
msg += "but got a {}."
msg = msg.format(type(launcher))
raise TypeError(msg)
return self
[docs]
def __iadd__(self, kernel):
"""Push a kernel into the list."""
self.push_kernels(kernel)
return self
[docs]
def __call__(self, queue, wait_for=None, **kwds):
"""
Enqueue all kernels on the given queue in order.
The first enqueued kernel will wait on the wait_for events.
If this OpenClKernelListLauncher is empty, cl.wait_for_events
will be called instead.
"""
trace_kernel(self._apply_msg)
if __debug__:
parameters = self._parameters
msg = "Expected the following kernel parameters {} but got {}."
msg = msg.format(
", ".join(f"'{k}'" for k in parameters),
", ".join(f"'{k}'" for k in kwds),
)
assert not (set(parameters.keys()) - set(kwds.keys())), msg
kernels = self._kernels
if kernels:
evt = kernels[0](queue=queue, wait_for=wait_for, **kwds)
for kernel in kernels[1:]:
try:
evt = kernel(queue=queue, **kwds)
except:
msg = "\nFailed to call kernel {} of type {}.\n"
msg = msg.format(kernel.name, type(kernel).__name__)
print(msg)
raise
else:
if __KERNEL_DEBUG__ or __TRACE_KERNELS__:
msg = f"No kernels enqueued for KernelListLauncher::{self.name}"
warnings.warn(msg, HysopWarning)
evt = cl.enqueue_marker(queue=queue, wait_for=wait_for)
return evt
def _update_parameters_from_parametrized_kernel(self, kernel, parameters):
"""
Update parameters of this kernel list launcher from a
OpenClParametrizedKernelLauncher (or HostLauncherI).
"""
check_instance(kernel, (OpenClParametrizedKernelLauncher, HostLauncherI))
check_instance(parameters, dict, keys=str, values=(type, npw.dtype))
sparameters = self._parameters
for pname, ptype in parameters.items():
if pname in sparameters:
(stype, op_names) = sparameters[pname]
if stype != ptype:
msg = "Trying to register parameter {} with type {} "
msg += "but it was already registered with type {} by the "
msg += "following operators:\n {}."
msg = msg.format(pname, ptype, stype, ", ".join(op_names))
raise RuntimeError(msg)
sparameters[pname][1].add(kernel.name)
else:
sparameters[pname] = (ptype, {kernel.name})
def _update_parameters_from_kernel_list_launcher(self, kernel_list_launcher):
"""Update parameters of this kernel list launcher from a OpenClKernelListLauncher."""
check_instance(kernel_list_launcher, OpenClKernelListLauncher)
parameters = kernel_list_launcher._parameters
sparameters = self._parameters
for pname, (ptype, knames) in parameters.items():
if pname in sparameters:
(stype, op_names) = sparameters[pname]
if stype != ptype:
msg = "Trying to register parameter {} with type {} "
msg += "but it was already registered with type {} by the "
msg += "following operators:\n {}."
msg = msg.format(pname, ptype, stype, ", ".join(op_names))
raise RuntimeError(msg)
sparameters[pname][1].update(knames)
else:
sparameters[pname] = (ptype, knames)
def _get_name(self):
"""Return the OpenClKernelLauncher name."""
return self._name
def _get_parameters(self):
"""
Return parameters of OpenClParametrizedKernelLauncher.
This is a mapping between the parameter names and
parameter types and operator names.
"""
return self._parameters
def _get_statistics(self):
"""Compute statistics of each kernels and clear events of kernels that finished."""
kernel_statistics = {}
for kernel in self._kernels:
kernel_statistics[kernel.name] = kernel.statistics
return kernel_statistics
name = property(_get_name)
parameters = property(_get_parameters)
statistics = property(_get_statistics)
[docs]
class LauncherI(metaclass=ABCMeta):
"""
Interface for any object that has the ability to be a launcher.
"""
def __init__(self, name, profiler=None, **kwds):
"""
Create a OpenClKernelLauncher.
Parameters
----------
name: str
A name for this kernel launcher (for logging purposes).
kwds: dict
Base class arguments.
"""
super().__init__(**kwds)
check_instance(name, str)
self._name = name
self._events = ()
self._kernel_statistics = OpenClKernelStatistics()
self._profiler = profiler
def _get_name(self):
"""Get the name of this kernel launcher."""
return self._name
def _get_events(self):
"""All events since the last call to update statistics."""
return self._events
def _get_statistics(self):
"""Compute statistics and clear events of kernels that finished."""
old_events = self._events
finished = tuple(
evt
for evt in events
if (evt.execution_statusin == cl.command_execution_status.COMPLETE)
)
running = tuple(evt for evt in old_events if (evt not in finished))
stats = OpenClKernelStatistics(finished)
self._kernel_statistics += stats
self._events = running
return self._kernel_statistics
def _register_event(self, queue, evt):
"""
Register an event in the event list if the queue has
the PROFILING_ENABLE flag set.
"""
if cl.command_queue_properties.PROFILING_ENABLE & queue.properties:
self._events += (evt,)
name = property(_get_name)
events = property(_get_events)
statistics = property(_get_statistics)
[docs]
@abstractmethod
def __call__(self, queue=None, wait_for=None, **kwds):
"""
Launch with a specific queue.
Wait for wait_for events before computing.
If queue has profiling enabled, events are pushed into a local list of events
to compute kernel statistics when self.statistics is fetched.
"""
pass
[docs]
class OpenClKernelLauncherI(LauncherI):
"""
Interface for any object that has the ability to enqueue a OpenCL kernel
without extra arguments.
"""
[docs]
@abstractmethod
def __call__(
self,
queue=None,
wait_for=None,
global_work_size=None,
local_work_size=None,
**kwds,
):
"""
Launch kernel with a specific queue.
Wait for wait_for events before computing.
If queue has profiling enabled, events are pushed into a local list of events
to compute kernel statistics when self.statistics is fetched.
"""
[docs]
def check_kernel_arg(self, arg, arg_id, arg_name, arg_type):
"""Check kernel argument type prior to setargs."""
if not (__KERNEL_DEBUG__ or __TRACE_KERNELS__):
return
if isinstance(arg_type, npw.dtype) or (
isinstance(arg_type, tuple)
and len(arg_type) == 1
and isinstance(arg_type[0], npw.dtype)
):
dtype = arg_type if isinstance(arg_type, npw.dtype) else arg_type[0]
good = isinstance(arg, npw.ndarray)
if not good:
msg = "Argument {}::{} at id {} at does not match required type np.ndarray, "
msg += "got {} instead."
msg = msg.format(self.name, arg_name, arg_id, dtype, type(arg))
raise RuntimeError(msg)
good = arg.dtype == dtype
if not good:
msg = "Argument {}::{} at id {} at does not match required dtype {}, "
msg += "got {} instead."
msg = msg.format(self.name, arg_name, arg_id, dtype, arg.dtype)
raise RuntimeError(msg)
else:
good = isinstance(arg, arg_type)
if not good:
msg = "Argument {}::{} at id {} at does not match required type {}, "
msg += "got {} instead."
msg = msg.format(self.name, arg_name, arg_id, arg_type, type(arg))
raise RuntimeError(msg)
[docs]
class HostLauncherI(LauncherI):
def __init__(self, name, **kwds):
super().__init__(name=name, **kwds)
self._apply_msg = f" HostLauncher.{name}()"
def __call__(self):
trace_kernel(self._apply_msg)
[docs]
def parameters(self):
return {}
[docs]
class OpenClKernelLauncher(OpenClKernelLauncherI):
"""
Wraps an OpenCL kernel ready to be enqueued without extra arguments.
Manage launching of one OpenCL kernel with fixed arguments.
"""
@debug
def __init__(
self,
name,
kernel,
args_list,
default_global_work_size=None,
default_local_work_size=None,
default_queue=None,
**kwds,
):
"""
Create a OpenClKernelLauncher.
Parameters
----------
name: str
A name for this kernel launcher (for logging purposes).
kernel: cl.Program or cl.Kernel
The precompiled program that contains the kernel to be launched.
If a program is passed we can create a unique instance of a kernel
that will have its arguments already set.
args_list: tuple
All arguments of the kernel, in the right order as a tuple
or tuple of tuples (arg_index, arg_value).
default_queue: cl.CommandQueue, optional
Default queue to run the kernel.
default_global_work_size: tuple of ints, optional
Default global work size.
default_local_work_size: tuple of ints, optional
Default local work size.
kwds: dict
Base class arguments.
"""
super().__init__(name, **kwds)
check_instance(args_list, tuple)
check_instance(kernel, (cl.Program, cl.Kernel), allow_none=True)
check_instance(default_queue, cl.CommandQueue, allow_none=True)
check_instance(
default_global_work_size, tuple, values=(int, npw.integer), allow_none=True
)
check_instance(
default_local_work_size, tuple, values=(int, npw.integer), allow_none=True
)
if isinstance(kernel, cl.Program):
kernels = kernel.all_kernels()
assert len(kernels) == 1
kernel = kernels[0]
kernel_is_shared = False
elif kernel is None:
kernel_is_shared = False
else:
# set_args will always be called on apply
kernel_is_shared = True
args_per_index = False
if args_list:
if isinstance(args_list[0], tuple):
aindexes = tuple(x[0] for x in args_list)
assert len(aindexes) == len(set(aindexes)), msg
for index, arg in args_list:
kernel.set_arg(index, arg)
args_per_index = True
else:
kernel.set_args(*args_list)
self._kernel = kernel
self._args_list = args_list
self._args_per_index = args_per_index
self._default_global_work_size = default_global_work_size
self._default_local_work_size = default_local_work_size
self._default_queue = default_queue
self._kernel_is_shared = kernel_is_shared
self._apply_msg = f" {name}<<<>>>"
[docs]
def as_list_launcher(self, name):
"""Convert a OpenClKernelLauncher to a OpenClKernelListLauncher."""
llauncher = OpenClKernelListLauncher(name=name)
llauncher.push_kernels(self)
return llauncher
def _get_kernel(self):
"""Get the precompiled kernel to be launched."""
assert self.kernel_is_shared, "Kernel cannot be shared."
return self._kernel
def _get_kernel_is_shared(self):
"""Return True if this kernel may be shared with other callers."""
def _get_args_list(self):
"""All arguments of the kernel as a tuple."""
return self._args_list
def _get_default_queue(self):
"""Default queue to launch the kernel."""
return self._default_queue
def _get_default_global_work_size(self):
"""Default global work size to launch the kernel."""
return self._default_global_work_size
def _get_default_local_work_size(self):
"""Default default_local_work_size to launch the kernel."""
return self._default_local_work_size
kernel = property(_get_kernel)
args_list = property(_get_args_list)
default_queue = property(_get_default_queue)
[docs]
def __call__(
self,
queue=None,
wait_for=None,
global_work_size=None,
local_work_size=None,
**kwds,
):
"""
Launch kernel with a specific queue, global_work_size and local_work_size.
Wait for wait_for events before computing.
If queue has profiling enabled, events are pushed into a local list of events
to compute kernel statistics when self.statistics is fetched.
"""
queue = first_not_None(queue, self._default_queue)
global_work_size = first_not_None(
global_work_size, self._default_global_work_size
)
local_work_size = first_not_None(local_work_size, self._default_local_work_size)
assert isinstance(queue, cl.CommandQueue)
assert isinstance(global_work_size, tuple)
assert isinstance(local_work_size, (tuple, type(None)))
apply_msg = self._apply_msg.format(global_work_size, local_work_size)
trace_kernel(apply_msg)
kernel = self._set_kernel_args(**kwds)
evt = cl.enqueue_nd_range_kernel(
queue=queue,
kernel=kernel,
global_work_size=global_work_size,
local_work_size=local_work_size,
wait_for=wait_for,
)
profile_kernel(None, evt, apply_msg, fprofiler=self._profiler)
self._register_event(queue, evt)
return evt
def _set_kernel_args(self, **kwds):
"""Set the arguments of this kernel and return the kernel."""
kernel = self._kernel
if self._kernel_is_shared:
args_list = self._args_list
if self._args_per_index:
for index, arg in args_list:
kernel.set_arg(index, arg)
else:
kernel.set_args(*self.args_list)
return kernel
[docs]
class OpenClParametrizedKernelLauncher(OpenClKernelLauncher):
"""
Wraps an OpenCL kernel ready to be enqueued with some extra kernel arguments.
"""
def __init__(self, parameters_map, args_list, **kwds):
"""
Create a OpenClParametrizedKernelLauncher.
Parameters
----------
args_list: tuple
Known arguments of the kernel, as a tuple of tuples (arg_index, arg_value).
parameters_map: dict
Mapping between unknown parameter names and (parameter_index, parameter_type).
kwds: dict
Base class parameters.
"""
check_instance(args_list, tuple, values=tuple)
check_instance(parameters_map, dict, keys=str, values=tuple)
pindexes = tuple(x[0] for x in parameters_map.values())
aindexes = tuple(x[0] for x in args_list)
assert len(pindexes) == len(
set(pindexes)
), f"Arguments at same position: {parameters_map}"
assert len(aindexes) == len(
set(aindexes)
), f"Arguments at same position: {parameters_map}"
if set(pindexes).intersection(aindexes):
msg = "Overlap between parameters indexes and default argument indexes."
msg += f"\nparameters: {parameters_map}\ndefault args: {args_list}"
raise ValueError(msg)
super().__init__(args_list=args_list, **kwds)
self._parameters_map = parameters_map
def _get_parameters_map(self):
"""
Mapping between parameter names and (parameter_index, parameter_type).
"""
return self._parameters_map
def _set_kernel_args(self, **kwds):
"""Set the arguments of this kernel and return the kernel."""
kernel = super()._set_kernel_args()
for pname, (pindex, ptypes) in self._parameters_map.items():
assert pname in kwds, f"{pname} was not given."
pval = kwds[pname]
self.check_kernel_arg(pval, pindex, pname, ptypes)
kernel.set_arg(pindex, pval)
return kernel
parameters_map = property(_get_parameters_map)
[docs]
class OpenClKernelParameterGenerator(metaclass=ABCMeta):
"""Abstract base for opencl kernel parameter yielders."""
def __iter__(self):
return self.new_generator()
[docs]
@abstractmethod
def new_generator(self):
pass
[docs]
class OpenClKernelParameterYielder(OpenClKernelParameterGenerator):
"""Generate opencl parameters through an external iterator or generator factory."""
def __init__(self, fn):
"""
Create a OpenClKernelParameterYielder.
Parameters
----------
fn: callable
Lambda, function or functor that takes no arguments
which should return a Generator or an Iterator uppon call.
Example:
lambda: range(10)
"""
assert callable(fn)
self._fn = fn
[docs]
def new_generator(self):
return self._fn()
[docs]
class OpenClIterativeKernelLauncher(OpenClParametrizedKernelLauncher):
"""
Wraps an OpenCL kernel ready to be iteratively enqueued with some extra kernel arguments.
Extra kernel arguments may be splitted between:
*classic extra arguments like with OpenClParametrizedKernelLauncher
*iterated arguments that automatically yielded for each __apply__
An OpenClIterativeKernelLauncher is acting like an OpenClKernelListLauncher
without the need of generating the list of all kernels with fixed arguments.
"""
def __init__(self, parameters_map, args_list, iterated_parameters, **kwds):
"""
Create a OpenClIterativeKernelLauncher.
Parameters
----------
args_list: tuple
Known arguments of the kernel, as a tuple of tuples (arg_index, arg_value).
parameters_map: dict
Mapping between unknown parameter names and (parameter_index, parameter_type).
iterated_parameters: dict
Mapping between iterated parameter names and OpenClKernelParameterGenerator.
Iterated parameters should be included in parameters_map as well.
kwds: dict
Base class parameters.
"""
check_instance(args_list, tuple, values=tuple)
check_instance(parameters_map, dict, keys=str, values=tuple)
check_instance(
iterated_parameters, dict, keys=str, values=OpenClKernelParameterGenerator
)
iterated_parameter_arg_ids = ()
iterated_parameter_arg_names = ()
iterated_parameter_arg_types = ()
iterated_parameter_generators = ()
for pname, pgen in iterated_parameters.items():
assert pname in parameters_map
arg_id, arg_type = parameters_map.pop(pname)
iterated_parameter_arg_ids += (arg_id,)
iterated_parameter_arg_names += (pname,)
iterated_parameter_arg_types += (arg_type,)
iterated_parameter_generators += (pgen,)
super().__init__(args_list=args_list, parameters_map=parameters_map, **kwds)
self.iterated_parameters = iterated_parameters
self.iterated_parameter_arg_ids = iterated_parameter_arg_ids
self.iterated_parameter_arg_names = iterated_parameter_arg_names
self.iterated_parameter_arg_types = iterated_parameter_arg_types
self.iterated_parameter_generators = iterated_parameter_generators
self._apply_msg = "{}{}<<<{}, {}>>>({})".format(
"{}", self.name, "{}", "{}", "{}"
)
[docs]
def iter_parameters(self):
return it.product(*self.iterated_parameter_generators)
[docs]
def __call__(
self,
queue=None,
wait_for=None,
enqueue_barrier=True,
global_work_size=None,
local_work_size=None,
**kwds,
):
"""
Launch kernel with a specific queue, global_work_size and local_work_size.
Wait for wait_for events before computing.
If queue has profiling enabled, events are pushed into a local list of events
to compute kernel statistics when self.statistics is fetched.
If the queue is out of order, a barrie is enqueued unless enqueue_barrier is set
to False. If enqueue_barrier is False, returned event is None.
"""
queue = first_not_None(queue, self._default_queue)
global_work_size = first_not_None(
global_work_size, self._default_global_work_size
)
local_work_size = first_not_None(local_work_size, self._default_local_work_size)
assert isinstance(queue, cl.CommandQueue)
assert isinstance(global_work_size, tuple)
assert isinstance(local_work_size, (tuple, type(None)))
apply_msg = self._apply_msg.format(
"{}", global_work_size, local_work_size, "{}"
)
trace_kernel(apply_msg.format(" ", "<yielder>"))
kernel = self._set_kernel_args(**kwds)
out_of_order_queue = (
queue.properties & cl.command_queue_properties.OUT_OF_ORDER_EXEC_MODE_ENABLE
)
arg_ids = self.iterated_parameter_arg_ids
arg_types = self.iterated_parameter_arg_types
arg_names = self.iterated_parameter_arg_names
for i, args in enumerate(self.iter_parameters()):
apply_msg = self._apply_msg.format(
"{}", global_work_size, local_work_size, "{}"
)
apply_msg = apply_msg.format(
" | ",
", ".join(f"{pname}={pval}" for (pname, pval) in zip(arg_names, args)),
)
trace_kernel(apply_msg)
for arg_id, arg_name, arg_type, arg_value in zip(
arg_ids, arg_names, arg_types, args
):
self.check_kernel_arg(arg_value, arg_id, arg_name, arg_types)
kernel.set_arg(arg_id, arg_value)
evt = cl.enqueue_nd_range_kernel(
queue=queue,
kernel=kernel,
global_work_size=global_work_size,
local_work_size=local_work_size,
wait_for=(wait_for if (i == 0 or out_of_order_queue) else None),
)
profile_kernel(None, evt, apply_msg, fprofiler=self._profiler)
self._register_event(queue, evt)
if out_of_order_queue:
if enqueue_barrier:
cl.enqueue_barrier(queue)
evt = cl.enqueue_marker(queue)
else:
evt = None
return evt